package com.sailing.lianxi.demoservice.stream;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

import com.sailing.lianxi.common.Constants;

import java.util.Locale;
import java.util.Properties;
 
public class WordCount {
 
    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
 
    	/**
    	 * 定义了对每条记录的处理逻辑，也印证了Kafka可具有记录级的数据处理能力
    	 */
        @Override
        public Processor<String, String> get() {
            return new Processor<String, String>() {
                private ProcessorContext context;
                private KeyValueStore<String, Integer> kvStore;
 
                @Override
                @SuppressWarnings("unchecked")
                public void init(ProcessorContext context) {
                    this.context = context;
                    this.context.schedule(1000);
                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
                }
 
                @Override
                public void process(String dummy, String line) {
                	System.out.println("line="+line);
//                    String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
// 
//                    for (String word : words) {
//                        Integer oldValue = this.kvStore.get(word);
// 
//                        if (oldValue == null) {
//                            this.kvStore.put(word, 1);
//                        } else {
//                            this.kvStore.put(word, oldValue + 1);
//                        }
//                    }
 
                    context.commit();
                }
 
                @Override
                public void punctuate(long timestamp) {
//                    try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
//                        System.out.println("----------- " + timestamp + " ----------- ");
// 
//                        while (iter.hasNext()) {
//                            KeyValue<String, Integer> entry = iter.next();
// 
//                            System.out.println("[" + entry.key + ", " + entry.value + "]");
// 
//                            context.forward(entry.key, entry.value.toString());
//                        }
//                    }
                }
 
                @Override
                public void close() {
                    this.kvStore.close();
                }
            };
        }
    }
 
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-count-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_SERVERS);
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, Constants.KAFKA_ZOOKEEPER_URL);
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
        TopologyBuilder builder = new TopologyBuilder();
 
        builder.addSource("Source", Constants.TOPIC_NAME);
 
        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
 
        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
 
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
 
        // usually the stream application would be running forever,
        // in this example we just let it run for some time and stop since the input data is finite.
        Thread.sleep(5000L);
 
        streams.close();
    }
    
    public static Properties getProperties() {
    	
    }
    
}